package cn.doitedu.rtmk.demo.demo2;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.Map;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/9
 * @Desc: 学大数据，到多易教育
 * <p>
 * 事件驱动型的基于规则的自动化实时营销系统DEMO
 * -- 用户行为日志测试数据
 * {"userId":1,"eventId":"event1","properties":{},"timeStamp":1675935310000}
 * {"userId":1,"eventId":"add_cart","properties":{"item_price":150},"timeStamp":1675935311000}
 * {"userId":1,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935312000}
 * {"userId":1,"eventId":"item_like","properties":{"item_price":150},"timeStamp":1675935313000}
 * {"userId":1,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935314000}
 * {"userId":1,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935315000}
 * {"userId":1,"eventId":"add_cart","properties":{"item_price":150},"timeStamp":1675935316000}
 * {"userId":2,"eventId":"event1","properties":{},"timeStamp":1675935310000}
 * {"userId":2,"eventId":"add_cart","properties":{"item_price":150},"timeStamp":1675935311000}
 * {"userId":2,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935312000}
 * {"userId":2,"eventId":"item_like","properties":{"item_price":150},"timeStamp":1675935313000}
 * {"userId":2,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935314000}
 * {"userId":2,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935315000}
 * {"userId":2,"eventId":"add_cart","properties":{"item_price":150},"timeStamp":1675935316000}
 *
 * -- 规则参数测试数据
 * {"fireEventId":"add_cart","fileEventPropertyKey":"item_price","fileEventPropertyValue":100,"ageConditionMin":20,"ageConditionMax":40,"genderCondition":"male","tg03Condition":1000,"dynamicConditionValue1":2,"dynamicConditionvalue2":0}
 **/
public class Demo2 {
    public static void main(String[] args) throws Exception {
        // 构建编程环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000);
        env.setParallelism(1);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");

        // 读 kafka中的日志明细数据,接收用户的行为
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("doitedu:9092")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setGroupId("gpac01" + System.currentTimeMillis())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setTopics("rtmk-test-data")
                .build();
        DataStreamSource<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "s");
        SingleOutputStreamOperator<EventBean> beans = ds.map(json -> JSON.parseObject(json, EventBean.class));

        // 读取kafka中的规则参数
        KafkaSource<String> paramDataSource = KafkaSource.<String>builder()
                .setBootstrapServers("doitedu:9092")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setGroupId("gpac01" + System.currentTimeMillis())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setTopics("rtmk-rule-param")
                .build();
        DataStreamSource<String> paramData = env.fromSource(paramDataSource, WatermarkStrategy.noWatermarks(), "s");
        SingleOutputStreamOperator<RuleParamBean> paramBeans = paramData.map(json -> JSON.parseObject(json, RuleParamBean.class));


        MapStateDescriptor<String, RuleParamBean> bcStateDesc = new MapStateDescriptor<>("param_bc_state", String.class, RuleParamBean.class);
        BroadcastStream<RuleParamBean> paramBc = paramBeans.broadcast(bcStateDesc);

        SingleOutputStreamOperator<RtmkMessage> res =
                beans.keyBy(EventBean::getUserId)
                        .connect(paramBc)
                        .process(new KeyedBroadcastProcessFunction<Integer, EventBean, RuleParamBean, RtmkMessage>() {
                            Table table;
                            final byte[] FAMILY = "f" .getBytes();
                            final byte[] AGE = Bytes.toBytes("age");
                            final byte[] GENDER = Bytes.toBytes("gender");
                            final byte[] TG03 = Bytes.toBytes("tg03");

                            ListState<EventBean> con1State;
                            ListState<EventBean> con2State;

                            @Override
                            public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                                Configuration config = HBaseConfiguration.create();
                                config.set("hbase.zookeeper.quorum", "doitedu:2181");
                                Connection conn = ConnectionFactory.createConnection(config);
                                table = conn.getTable(TableName.valueOf("user_profile"));

                                StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(5)).useProcessingTime().build();
                                ListStateDescriptor<EventBean> con1StateDesc =
                                        new ListStateDescriptor<>("con1_state", EventBean.class);
                                con1StateDesc.enableTimeToLive(ttlConfig);
                                con1State = getRuntimeContext().getListState(con1StateDesc);

                                StateTtlConfig ttlConfig2 = StateTtlConfig.newBuilder(Time.minutes(30)).useProcessingTime().build();
                                ListStateDescriptor<EventBean> con2StateDesc =
                                        new ListStateDescriptor<>("con2_state", EventBean.class);
                                con1StateDesc.enableTimeToLive(ttlConfig2);
                                con2State = getRuntimeContext().getListState(con2StateDesc);
                            }

                            @Override
                            public void processElement(EventBean eventBean, KeyedBroadcastProcessFunction<Integer, EventBean, RuleParamBean, RtmkMessage>.ReadOnlyContext ctx, Collector<RtmkMessage> out) throws Exception {

                                ReadOnlyBroadcastState<String, RuleParamBean> broadcastState = ctx.getBroadcastState(bcStateDesc);
                                // 从广播状态中，取出规则参数对象
                                RuleParamBean ruleParamBean = broadcastState.get("rule-001");
                                if(ruleParamBean == null ) return;

                                System.out.println("当前，规则的参数对象为： " + ruleParamBean);

                                // 进行规则所需要的实时画像标签的计算
                                if (eventBean.getEventId().equals("item_share")) con1State.add(eventBean);
                                if (eventBean.getEventId().equals("item_like")) con2State.add(eventBean);

                                // 判断是否是预定义的营销规则的触发事件 ：加购事件（加购的商品价格>100)
                                if (eventBean.getEventId().equals(ruleParamBean.getFireEventId()) && Double.parseDouble(eventBean.getProperties().get(ruleParamBean.getFileEventPropertyKey())) > ruleParamBean.getFileEventPropertyValue()) {
                                    System.out.println(eventBean.getUserId() + " 触发了规则");

                                    // 判断该事件的行为人是否满足营销规则中定义的判断条件
                                    // 离线静态画像条件：从离线画像库中可以查询到的条件
                                    //  age : (20,40]
                                    //  gender: male
                                    //  半年内月平均消费额(tg03): > 1000
                                    Get get = new Get(Bytes.toBytes(eventBean.getUserId()));
                                    Result result = table.get(get);

                                    int age = Bytes.toInt(result.getValue(FAMILY, AGE));
                                    String gender = Bytes.toString(result.getValue(FAMILY, GENDER));
                                    int tg03 = Bytes.toInt(result.getValue(FAMILY, TG03));

                                    if (age > ruleParamBean.getAgeConditionMin() && age <= ruleParamBean.ageConditionMax && gender.equals(ruleParamBean.getGenderCondition()) && tg03 > ruleParamBean.getTg03Condition()) {
                                        //  实时动态画像条件：
                                        //  最近 5分钟发生过 ： 分享行为 2次以上
                                        //  最近 30分钟发生过 ：点赞行为 1次以上
                                        int con1Count = 0;
                                        for (EventBean bean : con1State.get()) {
                                            con1Count++;
                                        }
                                        if (con1Count > ruleParamBean.dynamicConditionValue1) {
                                            int con2Count = 0;
                                            for (EventBean bean : con2State.get()) {
                                                con2Count++;
                                            }
                                            if (con2Count > ruleParamBean.dynamicConditionvalue2) {
                                                // 走到这里，就所有的条件（包含离线静态画像标签条件和 实时动态画像标签条件）都满足了
                                                out.collect(new RtmkMessage(eventBean.getUserId(), eventBean.getTimeStamp(), "rule-001"));
                                            }
                                        }
                                    }
                                }
                            }

                            @Override
                            public void processBroadcastElement(RuleParamBean paramBean, KeyedBroadcastProcessFunction<Integer, EventBean, RuleParamBean, RtmkMessage>.Context ctx, Collector<RtmkMessage> out) throws Exception {
                                if(paramBean != null ) {
                                    System.out.println("收到规则001的参数信息 ------------");
                                    // 收到广播流的参数信息，则将其放入 广播状态
                                    BroadcastState<String, RuleParamBean> broadcastState = ctx.getBroadcastState(bcStateDesc);
                                    broadcastState.put("rule-001", paramBean);

                                }
                            }
                        });
        res.print();
        env.execute();
    }
}
